Spark算子学习之四、其他算子 | 您所在的位置:网站首页 › rdd sortby排序 › Spark算子学习之四、其他算子 |
1.sortBy
1)函数签名
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
2)功能描述
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。注意:排序后新产生的RDD的分区数与原RDD的分区数一致。 3)简单案例 案例一、 sc.makeRDD(List((2, "a"), (2, "a"), (2, "a"), (2, "a"), (3, "b"), (5, "c")), 3) .sortBy(_._1, true).collect().foreach(println)结果 (2,a) (2,a) (2,a) (2,a) (3,b) (5,c) 案例二、 sc.makeRDD(List((2, "a"), (2, "a"), (2, "a"), (2, "a"), (3, "b"), (5, "c")), 3) .sortBy(_._1, true).mapPartitionsWithIndex( (index, datas) => { datas.map((index, _)) } ).foreach(println)(1,(3,b)) (0,(2,a)) (2,(5,c)) (0,(2,a)) (0,(2,a)) (0,(2,a)) 从另个案例结果可以看到,其实sortby在处理数据时使用了range分区,不同分区取得值是某个范围内的数据,分区之间数据不重叠,但是从案例二结果中可以看到,0号分区中的数据很多,如果数据量大则可能出现数据倾斜。 2.filter 1)函数签名 def filter(f: T => Boolean): RDD[T] 2)功能描述接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中,不会改变分区。 3)简单案例 val result = sc.makeRDD(List("abc", "bac", "ad", "cb"), 3) result.filter(_.startsWith("a")) .mapPartitionsWithIndex((index, datas) => { datas.map((index, _)) }) .foreach(println)结果 (2,ad) (0,abc) filter结果很简单,由于filter算子不改变分区数量,所以在生产环境可能会出现数据倾斜,因为不同分区过滤掉的数据量不同。 3.glom 1)函数签名 def glom(): RDD[Array[T]] 2)功能描述将RDD中每一个分区变成一个数组,并放置在新的RDD中,数组中元素的类型与原分区中元素类型一致。 3)简单案例 sc.makeRDD(List(4, 2, 1, 3), 2) .mapPartitionsWithIndex((index, datas) => { datas.map((s"分区$index:" + _)) }).foreach(println) println("xxxxxxxxxxx") //重点关注返回结果数据类型 val result:RDD[Array[Int]] = sc.makeRDD(List(4, 2, 1, 3), 2) .glom() result.mapPartitionsWithIndex((index, datas) => { datas.map { list => { for (l datas.map((index, _)) }).foreach(println) println("xxxxxxxxxxxxxxxx") result.flatMap(_.iterator) .distinct() .mapPartitionsWithIndex((index, datas) => { datas.map((index, _)) }).foreach(println)结果 (2,ad) (1,bac) (0,abc) (2,cb) xxxxxxxxxxxxxxxx (2,b) (1,d) (0,c) (1,a) 5.coalesce 1)函数签名 def coalesce(numPartitions: Int, shuffle: Boolean = false, //默认false不执行shuffle partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] 2)功能描述缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。coalesce方法包含两种情况,一种使用shuffle一种不使用shuffle。 3)简单案例 案例一、不使用shuffle扩大分区 val result = sc.makeRDD(List("a", "b", "c", "d"),1) result .coalesce(2,false) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)结果 (0,a) (0,b) (0,c) (0,d) 注意这里虽然将分区数从2个变成3个,但是最终结果还是两个分区。 案例二、使用shuffle扩大分区 val result = sc.makeRDD(List("a", "b", "c", "d"),1) result .coalesce(2,true) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)结果 (0,a) (1,b) (0,c) (1,d) 案例三、缩小分区 val result = sc.makeRDD(List("a", "b", "c", "d"),2) result .coalesce(1) .mapPartitionsWithIndex((index,datas) => { datas.map((index,_)) }).foreach(println)结果 (0,a) (0,b) (0,c) (0,d) 结合以上几个案例,其实coalesce一般用来缩小分区。 6.repartition 1)函数签名 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] 2)功能描述该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。 3)简单案例 sc.makeRDD(List(2,3,5),1) .repartition(2) .mapPartitionsWithIndex((index,datas) =>{ datas.map((index,_)) }).collect().foreach(println)结果 (0,2) (0,5) (1,3) 4)coalesce和repartition区别1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 2)repartition实际上是调用的coalesce,进行shuffle。源码如下: def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }3)coalesce一般为缩减分区,如果扩大分区,也不会增加分区总数,意义不大。repartition扩大分区执行shuffle,可以达到扩大分区的效果。 |
CopyRight 2018-2019 实验室设备网 版权所有 |